package tcp

import (
	"bytes"
	"context"
	"encoding/binary"
	"fmt"
	"gitee.com/zhongguo168a/go-nodex/nodex/server"
	"gitee.com/zhongguo168a/gocodes/myx/errorx"
	"gitee.com/zhongguo168a/gocodes/myx/logx"
	"io"
	"net"
	"sync"
	"time"
)

type Connection struct {
	//当前Conn属于哪个Server
	Server server.IServer
	//当前连接的socket TCP套接字
	Conn net.Conn
	//当前连接的ID 也可以称作为SessionID，ID全局唯一
	ConnID uint64
	//消息管理MsgId和对应处理方法的消息管理模块
	MsgHandler server.IMsgHandle
	//告知该链接已经退出/停止的channel
	ctx    context.Context
	cancel context.CancelFunc
	//无缓冲管道，用于读、写两个goroutine之间的消息通信
	msgChan chan []byte
	//有缓冲管道，用于读、写两个goroutine之间的消息通信
	msgBuffChan chan []byte

	sync.RWMutex
	//链接属性
	property *sync.Map
	//当前连接的关闭状态
	isClosed bool
	//
	lastHeartBeatTime time.Time
}

// 创建连接的方法
func NewConntion(server server.IServer, conn net.Conn, connID uint64, msgHandler server.IMsgHandle) *Connection {
	//初始化Conn属性
	c := &Connection{
		Server:      server,
		Conn:        conn,
		ConnID:      connID,
		isClosed:    false,
		MsgHandler:  msgHandler,
		msgChan:     make(chan []byte),
		msgBuffChan: make(chan []byte, server.GetConfig().MaxMsgChanLen),
		property:    &sync.Map{},
	}

	//将新创建的Conn添加到链接管理中
	c.Server.GetConnMgr().Add(c)
	return c
}

/*
写消息Goroutine， 用户将数据发送给客户端
*/
func (c *Connection) StartWriter() {
	var (
		err error
	)
	defer func() {
		if err != nil {
			logx.Info(fmt.Sprintf("connection closed at writer: %v", err.Error()))
		}
		c.Stop()
	}()

	for {
		select {
		case data := <-c.msgChan:
			//有数据要写给客户端
			if _, msgerr := c.Conn.Write(data); err != nil {
				err = errorx.Wrap(msgerr, "write message")
				return
			}
			//fmt.Printf("Send data succ! data = %+v\n", data)
		case data, ok := <-c.msgBuffChan:
			if ok {
				//有数据要写给客户端
				if _, msgerr := c.Conn.Write(data); err != nil {
					err = errorx.Wrap(msgerr, "write message buff")
					return
				}
			} else {
				err = errorx.New("message buff closed")
				return
			}
		case <-c.ctx.Done():
			return
		}
	}
}

/*
读消息Goroutine，用于从客户端中读取数据
*/
func (c *Connection) StartReader() {
	var (
		err error
	)
	defer func() {
		if err != nil {
			logx.Info(fmt.Sprintf("connection closed at reader: %v", err.Error()))
		}
		c.Stop()
	}()

	var (
		// 消息长度
		length  = 0                                       // 消息长度uint32
		ulength = uint16(0)                               // 消息缓冲
		msgbuf  = bytes.NewBuffer(make([]byte, 0, 10240)) // 数据缓冲
		databuf = make([]byte, 4096)
		n       = 0
	)
	for {
		select {
		case <-c.ctx.Done():
			return
		default:
			n, err = c.Conn.Read(databuf)
			if err == io.EOF {
			}
			if err != nil {
				err = errorx.Wrap(err, "read connection")
				return
			}
			// 数据添加到消息缓冲
			n, err = msgbuf.Write(databuf[:n])
			if err != nil {
				err = errorx.Wrap(err, "msgbuf.Write")
				return
			}
			// 消息分割循环
			for {
				// 消息头
				if length == 0 && msgbuf.Len() >= 2 { // uint16
					_ = binary.Read(msgbuf, binary.LittleEndian, &ulength)
					length = int(ulength)
					// 检查超长消息
					if int(length) >= c.Server.GetConfig().MaxPacketSize {
						err = errorx.New("协议长度非法")
						return
					}
				}
				// 消息体
				if length > 0 && msgbuf.Len() >= length {
					data := msgbuf.Next(length)
					reader := bytes.NewReader(data)
					req := &server.Request{
						Conn: c,
					}
					if unpackerr := server.UnpackReceivedData(reader, c.Server, c.MsgHandler, req); unpackerr != nil {
						err = errorx.Wrap(unpackerr, "UnpackReceivedData")
						return
					}
					//从绑定好的消息和对应的处理方法中执行对应的Handle方法
					doerr := c.MsgHandler.Do(req)
					if doerr != nil {
						err = errorx.Wrap(doerr, "do message")
						return
					}
					length = 0
				} else {
					break
				}
			}

		}
	}
}

// 启动连接，让当前连接开始工作
func (c *Connection) Start() {
	c.ctx, c.cancel = context.WithCancel(context.Background())
	//1 开启用户从客户端读取数据流程的Goroutine
	go c.StartReader()
	//2 开启用于写回客户端数据流程的Goroutine
	go c.StartWriter()
	//按照用户传递进来的创建连接时需要处理的业务，执行钩子方法
	c.Server.CallOnConnStart(c)
}

// 停止连接，结束当前连接状态M
func (c *Connection) Stop() {
	//fmt.Println("Conn Stop()...ConnID = ", c.ConnID)

	c.Lock()
	defer c.Unlock()

	//如果当前链接已经关闭
	if c.isClosed == true {
		return
	}
	c.isClosed = true
	//如果用户注册了该链接的关闭回调业务，那么在此刻应该显示调用
	c.Server.CallOnConnStop(c)

	// 关闭socket链接
	c.Conn.Close()
	//关闭Writer
	c.cancel()

	//将链接从连接管理器中删除
	c.Server.GetConnMgr().Remove(c)

	//关闭该链接全部管道
	close(c.msgBuffChan)
}

// 从当前连接获取原始的socket TCPConn
func (c *Connection) GetTCPConnection() *net.TCPConn {
	return c.Conn.(*net.TCPConn)
}

// 获取当前连接ID
func (c *Connection) GetConnID() uint64 {
	return c.ConnID
}

// 获取远程客户端地址信息
func (c *Connection) RemoteAddr() net.Addr {
	return c.Conn.RemoteAddr()
}

// 直接将Message数据发送数据给远程的TCP客户端
func (c *Connection) SendMsg(message server.IMessage) error {
	c.RLock()
	if c.isClosed == true {
		c.RUnlock()
		return errorx.New("connection closed when send msg")
	}
	c.RUnlock()

	//将data封包，并且发送
	buff := bytes.NewBuffer([]byte{})
	err := message.Pack(buff)
	if err != nil {
		return errorx.Wrap(err, "pack")
	}

	//写回客户端
	c.msgChan <- buff.Bytes()

	return nil
}

func (c *Connection) SendMsgBuff(message server.IMessage) error {
	c.RLock()
	if c.isClosed == true {
		c.RUnlock()
		return errorx.New("Connection closed when send buff msg")
	}
	c.RUnlock()

	//将data封包，并且发送
	buff := bytes.NewBuffer([]byte{})
	err := message.Pack(buff)
	if err != nil {
		return errorx.Wrap(err, "pack")
	}

	//写回客户端
	c.msgBuffChan <- buff.Bytes()

	return nil
}

// 清空缓冲区数据
func (c *Connection) Flush() error {
	return nil
}

// 获取链接属性
// 可通过mapsync访问属性
func (c *Connection) GetProperty() *sync.Map {
	return c.property
}
